{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Window LAG\n",
    "\n",
    "## COVID-19 Data\n",
    "Notes on the data: This data was assembled based on work done by [Rodrigo Pombo](https://github.com/pomber/covid19) based on [John Hopkins University](https://systems.jhu.edu/research/public-health/ncov/), based on [World Health Organisation](https://www.who.int/health-topics/coronavirus). The data was assembled 21st April 2020 - there are no plans to keep this data set up to date."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Loading spark-stubs, spark-hive\n",
      "Adding Hive conf dir /opt/hive/conf to classpath\n",
      "Creating SparkSession\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "SLF4J: No SLF4J providers were found.\n",
      "SLF4J: Defaulting to no-operation (NOP) logger implementation\n",
      "SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<a target=\"_blank\" href=\"http://jupyter:4041\">Spark UI</a>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$                                  \n",
       "\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.log4j.{Level, Logger}\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.functions._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.expressions.Window\n",
       "\n",
       "\u001b[39m\n",
       "\u001b[36mspark\u001b[39m: \u001b[32mSparkSession\u001b[39m = org.apache.spark.sql.SparkSession@3023096e"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import $ivy.`org.apache.spark::spark-sql:3.4.0`\n",
    "\n",
    "import org.apache.log4j.{Level, Logger}\n",
    "Logger.getLogger(\"org\").setLevel(Level.OFF)\n",
    "\n",
    "import org.apache.spark._\n",
    "import org.apache.spark.sql._\n",
    "import org.apache.spark.sql.types._\n",
    "import org.apache.spark.sql.functions._\n",
    "import org.apache.spark.sql.expressions.Window\n",
    "\n",
    "val spark = {\n",
    "    NotebookSparkSession.builder()\n",
    "    .progress(false)\n",
    "    .appName(\"app09+\")\n",
    "    // .master(\"spark://192.168.31.31:7077\")\n",
    "    .master(\"local[*]\")\n",
    "    .config(\"spark.sql.warehouse.dir\", \n",
    "            \"hdfs://192.168.31.31:9000/user/hive/warehouse\") \n",
    "    .config(\"spark.cores.max\", \"4\") \n",
    "    .config(\"spark.executor.instances\", \"1\") \n",
    "    .config(\"spark.executor.cores\", \"2\") \n",
    "    .config(\"spark.executor.memory\", \"10g\") \n",
    "    .config(\"spark.shuffle.service.enabled\", \"false\") \n",
    "    .config(\"spark.dynamicAllocation.enabled\", \"false\") \n",
    "    .config(\"spark.sql.catalogImplementation\", \"hive\")\n",
    "    .config(\"spark.sql.repl.eagerEval.enabled\", \"true\")\n",
    "    .config(\"spark.driver.allowMultipleContexts\", \"true\")\n",
    "    .getOrCreate()\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\u001b[32mimport \u001b[39m\u001b[36mspark.implicits._\n",
       "\u001b[39m\n",
       "defined \u001b[32mfunction\u001b[39m \u001b[36msc\u001b[39m\n",
       "\u001b[36mhiveCxt\u001b[39m: \u001b[32msql\u001b[39m.\u001b[32mhive\u001b[39m.\u001b[32mHiveContext\u001b[39m = org.apache.spark.sql.hive.HiveContext@193516e9"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import spark.implicits._\n",
    "def sc = spark.sparkContext\n",
    "val hiveCxt = new org.apache.spark.sql.hive.HiveContext(sc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "defined \u001b[32mclass\u001b[39m \u001b[36mRichDF\u001b[39m"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "// Credit to Aivean\n",
    "implicit class RichDF(val ds:DataFrame) {\n",
    "    def showHTML(limit: Int = 50, truncate: Int = 100) = {\n",
    "        import xml.Utility.escape\n",
    "        val data = ds.take(limit)\n",
    "        val header = ds.schema.fieldNames.toSeq        \n",
    "        val rows: Seq[Seq[String]] = data.map { row =>\n",
    "          row.toSeq.map {cell =>\n",
    "            val str = cell match {\n",
    "              case null => \"null\"\n",
    "              case binary: Array[Byte] => binary.map(\"%02X\".format(_)).mkString(\"[\", \" \", \"]\")\n",
    "              case array: Array[_] => array.mkString(\"[\", \", \", \"]\")\n",
    "              case seq: Seq[_] => seq.mkString(\"[\", \", \", \"]\")\n",
    "              case _ => cell.toString\n",
    "            }\n",
    "            if (truncate > 0 && str.length > truncate) {\n",
    "              // do not show ellipses for strings shorter than 4 characters.\n",
    "              if (truncate < 4) str.substring(0, truncate)\n",
    "              else str.substring(0, truncate - 3) + \"...\"\n",
    "            } else {\n",
    "              str\n",
    "            }\n",
    "          }: Seq[String]\n",
    "        }\n",
    "    publish.html(s\"\"\" <table>\n",
    "                <tr>\n",
    "                 ${header.map(h => s\"<th>${escape(h)}</th>\").mkString}\n",
    "                </tr>\n",
    "                ${rows.map {row =>\n",
    "                  s\"<tr>${row.map{c => s\"<td>${escape(c)}</td>\" }.mkString}</tr>\"\n",
    "                }.mkString}\n",
    "            </table>\n",
    "        \"\"\")\n",
    "    }\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\u001b[36mcovid\u001b[39m: \u001b[32mDataFrame\u001b[39m = [name: string, whn: string ... 3 more fields]\n",
       "\u001b[36mworld\u001b[39m: \u001b[32mDataFrame\u001b[39m = [name: string, continent: string ... 6 more fields]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val covid = hiveCxt.table(\"sqlzoo.covid\")\n",
    "val world = hiveCxt.table(\"sqlzoo.world\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Window Function\n",
    "The SQL Window functions include LAG, LEAD, RANK and NTILE. These functions operate over a \"window\" of rows - typically these are rows in the table that are in some sense adjacent."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Introducing the `covid` table\n",
    "\n",
    "The example uses a WHERE clause to show the cases in 'Italy' in March.\n",
    "\n",
    "**Modify the query to show data from Spain**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>name</th><th>day</th><th>confirmed</th><th>deaths</th><th>recovered</th>\n",
       "                </tr>\n",
       "                <tr><td>Spain</td><td>1</td><td>84</td><td>0</td><td>2</td></tr><tr><td>Spain</td><td>2</td><td>120</td><td>0</td><td>2</td></tr><tr><td>Spain</td><td>3</td><td>165</td><td>1</td><td>2</td></tr><tr><td>Spain</td><td>4</td><td>222</td><td>2</td><td>2</td></tr><tr><td>Spain</td><td>5</td><td>259</td><td>3</td><td>2</td></tr><tr><td>Spain</td><td>6</td><td>400</td><td>5</td><td>2</td></tr><tr><td>Spain</td><td>7</td><td>500</td><td>10</td><td>30</td></tr><tr><td>Spain</td><td>8</td><td>673</td><td>17</td><td>30</td></tr><tr><td>Spain</td><td>9</td><td>1073</td><td>28</td><td>32</td></tr><tr><td>Spain</td><td>10</td><td>1695</td><td>35</td><td>32</td></tr><tr><td>Spain</td><td>11</td><td>2277</td><td>54</td><td>183</td></tr><tr><td>Spain</td><td>12</td><td>2277</td><td>55</td><td>183</td></tr><tr><td>Spain</td><td>13</td><td>5232</td><td>133</td><td>193</td></tr><tr><td>Spain</td><td>14</td><td>6391</td><td>195</td><td>517</td></tr><tr><td>Spain</td><td>15</td><td>7798</td><td>289</td><td>517</td></tr><tr><td>Spain</td><td>16</td><td>9942</td><td>342</td><td>530</td></tr><tr><td>Spain</td><td>17</td><td>11748</td><td>533</td><td>1028</td></tr><tr><td>Spain</td><td>18</td><td>13910</td><td>623</td><td>1081</td></tr><tr><td>Spain</td><td>19</td><td>17963</td><td>830</td><td>1107</td></tr><tr><td>Spain</td><td>20</td><td>20410</td><td>1043</td><td>1588</td></tr><tr><td>Spain</td><td>21</td><td>25374</td><td>1375</td><td>2125</td></tr><tr><td>Spain</td><td>22</td><td>28768</td><td>1772</td><td>2575</td></tr><tr><td>Spain</td><td>23</td><td>35136</td><td>2311</td><td>2575</td></tr><tr><td>Spain</td><td>24</td><td>39885</td><td>2808</td><td>3794</td></tr><tr><td>Spain</td><td>25</td><td>49515</td><td>3647</td><td>5367</td></tr><tr><td>Spain</td><td>26</td><td>57786</td><td>4365</td><td>7015</td></tr><tr><td>Spain</td><td>27</td><td>65719</td><td>5138</td><td>9357</td></tr><tr><td>Spain</td><td>28</td><td>73235</td><td>5982</td><td>12285</td></tr><tr><td>Spain</td><td>29</td><td>80110</td><td>6803</td><td>14709</td></tr><tr><td>Spain</td><td>30</td><td>87956</td><td>7716</td><td>16780</td></tr><tr><td>Spain</td><td>31</td><td>95923</td><td>8464</td><td>19259</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(covid.withColumn(\"day\", dayofmonth(covid(\"whn\")))\n",
    "    .filter((covid(\"name\")===\"Spain\") && (month(covid(\"whn\"))===3))\n",
    "    .select(\"name\", \"day\", \"confirmed\", \"deaths\", \"recovered\")\n",
    "    .orderBy(\"day\")\n",
    "    .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2. Introducing the LAG function\n",
    "\n",
    "The LAG function is used to show data from the preceding row or the table. When lining up rows the data is partitioned by country name and ordered by the data whn. That means that only data from Italy is considered.\n",
    "\n",
    "**Modify the query to show confirmed for the day before.**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>name</th><th>day</th><th>confirmed</th><th>mo</th><th>lag_cfrm</th>\n",
       "                </tr>\n",
       "                <tr><td>Italy</td><td>1</td><td>1694</td><td>3</td><td>null</td></tr><tr><td>Italy</td><td>2</td><td>2036</td><td>3</td><td>1694</td></tr><tr><td>Italy</td><td>3</td><td>2502</td><td>3</td><td>2036</td></tr><tr><td>Italy</td><td>4</td><td>3089</td><td>3</td><td>2502</td></tr><tr><td>Italy</td><td>5</td><td>3858</td><td>3</td><td>3089</td></tr><tr><td>Italy</td><td>6</td><td>4636</td><td>3</td><td>3858</td></tr><tr><td>Italy</td><td>7</td><td>5883</td><td>3</td><td>4636</td></tr><tr><td>Italy</td><td>8</td><td>7375</td><td>3</td><td>5883</td></tr><tr><td>Italy</td><td>9</td><td>9172</td><td>3</td><td>7375</td></tr><tr><td>Italy</td><td>10</td><td>10149</td><td>3</td><td>9172</td></tr><tr><td>Italy</td><td>11</td><td>12462</td><td>3</td><td>10149</td></tr><tr><td>Italy</td><td>12</td><td>12462</td><td>3</td><td>12462</td></tr><tr><td>Italy</td><td>13</td><td>17660</td><td>3</td><td>12462</td></tr><tr><td>Italy</td><td>14</td><td>21157</td><td>3</td><td>17660</td></tr><tr><td>Italy</td><td>15</td><td>24747</td><td>3</td><td>21157</td></tr><tr><td>Italy</td><td>16</td><td>27980</td><td>3</td><td>24747</td></tr><tr><td>Italy</td><td>17</td><td>31506</td><td>3</td><td>27980</td></tr><tr><td>Italy</td><td>18</td><td>35713</td><td>3</td><td>31506</td></tr><tr><td>Italy</td><td>19</td><td>41035</td><td>3</td><td>35713</td></tr><tr><td>Italy</td><td>20</td><td>47021</td><td>3</td><td>41035</td></tr><tr><td>Italy</td><td>21</td><td>53578</td><td>3</td><td>47021</td></tr><tr><td>Italy</td><td>22</td><td>59138</td><td>3</td><td>53578</td></tr><tr><td>Italy</td><td>23</td><td>63927</td><td>3</td><td>59138</td></tr><tr><td>Italy</td><td>24</td><td>69176</td><td>3</td><td>63927</td></tr><tr><td>Italy</td><td>25</td><td>74386</td><td>3</td><td>69176</td></tr><tr><td>Italy</td><td>26</td><td>80589</td><td>3</td><td>74386</td></tr><tr><td>Italy</td><td>27</td><td>86498</td><td>3</td><td>80589</td></tr><tr><td>Italy</td><td>28</td><td>92472</td><td>3</td><td>86498</td></tr><tr><td>Italy</td><td>29</td><td>97689</td><td>3</td><td>92472</td></tr><tr><td>Italy</td><td>30</td><td>101739</td><td>3</td><td>97689</td></tr><tr><td>Italy</td><td>31</td><td>105792</td><td>3</td><td>101739</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(covid.withColumn(\"day\", dayofmonth(covid(\"whn\")))\n",
    "     .withColumn(\"mo\", month(covid(\"whn\")))\n",
    "     .filter((covid(\"name\")===\"Italy\") && \n",
    "             (month(covid(\"whn\"))===3))\n",
    "     .withColumn(\"lag_cfrm\", lag(col(\"confirmed\"), 1)\n",
    "                 .over(Window.orderBy(\"day\").partitionBy(\"name\")))\n",
    "     .select(\"name\", \"day\", \"confirmed\", \"mo\", \"lag_cfrm\")\n",
    "     .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### LAG operation\n",
    "\n",
    "Here is the correct query showing the cases for the day before:\n",
    "\n",
    "```sql\n",
    "SELECT name, DAY(whn), confirmed,\n",
    "   LAG(confirmed, 1) OVER (partition by name ORDER BY whn) AS lag\n",
    " FROM covid\n",
    "WHERE name = 'Italy'\n",
    "AND MONTH(whn) = 3\n",
    "ORDER BY whn\n",
    "```\n",
    "\n",
    "Notice how the values in the LAG column match the value of the row diagonally above and to the left.\n",
    "\n",
    "name | DAY(whn) | confirmed | dbf\n",
    "------|---|------|----------\n",
    "Italy | 1 | **1694** | null\n",
    "Italy | 2 | 2036 | **1694**\n",
    "Italy | 3 | 2502 | 2036\n",
    "Italy | 4 | 3089 | 2502\n",
    "Italy | 5 | **3858** | 3089\n",
    "Italy | 6 | 4636 | **3858**\n",
    "Italy | 7 | 5883 | 4636\n",
    "Italy | 8 | 7375 | 5883\n",
    "Italy | 9 | 9172 | 7375\n",
    "Italy | 10 | 10149 | 9172\n",
    "... | | |"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3. Number of new cases\n",
    "\n",
    "The number of confirmed case is cumulative - but we can use LAG to recover the number of new cases reported for each day.\n",
    "\n",
    "**Show the number of new cases for each day, for Italy, for March.**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>name</th><th>day</th><th>new</th>\n",
       "                </tr>\n",
       "                <tr><td>Italy</td><td>1</td><td>null</td></tr><tr><td>Italy</td><td>2</td><td>342</td></tr><tr><td>Italy</td><td>3</td><td>466</td></tr><tr><td>Italy</td><td>4</td><td>587</td></tr><tr><td>Italy</td><td>5</td><td>769</td></tr><tr><td>Italy</td><td>6</td><td>778</td></tr><tr><td>Italy</td><td>7</td><td>1247</td></tr><tr><td>Italy</td><td>8</td><td>1492</td></tr><tr><td>Italy</td><td>9</td><td>1797</td></tr><tr><td>Italy</td><td>10</td><td>977</td></tr><tr><td>Italy</td><td>11</td><td>2313</td></tr><tr><td>Italy</td><td>12</td><td>0</td></tr><tr><td>Italy</td><td>13</td><td>5198</td></tr><tr><td>Italy</td><td>14</td><td>3497</td></tr><tr><td>Italy</td><td>15</td><td>3590</td></tr><tr><td>Italy</td><td>16</td><td>3233</td></tr><tr><td>Italy</td><td>17</td><td>3526</td></tr><tr><td>Italy</td><td>18</td><td>4207</td></tr><tr><td>Italy</td><td>19</td><td>5322</td></tr><tr><td>Italy</td><td>20</td><td>5986</td></tr><tr><td>Italy</td><td>21</td><td>6557</td></tr><tr><td>Italy</td><td>22</td><td>5560</td></tr><tr><td>Italy</td><td>23</td><td>4789</td></tr><tr><td>Italy</td><td>24</td><td>5249</td></tr><tr><td>Italy</td><td>25</td><td>5210</td></tr><tr><td>Italy</td><td>26</td><td>6203</td></tr><tr><td>Italy</td><td>27</td><td>5909</td></tr><tr><td>Italy</td><td>28</td><td>5974</td></tr><tr><td>Italy</td><td>29</td><td>5217</td></tr><tr><td>Italy</td><td>30</td><td>4050</td></tr><tr><td>Italy</td><td>31</td><td>4053</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(covid.withColumn(\"day\", dayofmonth(covid(\"whn\")))\n",
    "     .filter((covid(\"name\")===\"Italy\") && \n",
    "             (month(covid(\"whn\"))===3))\n",
    "     .withColumn(\"new\", col(\"confirmed\") - lag(col(\"confirmed\"), 1).over(\n",
    "         Window.orderBy(\"day\").partitionBy(\"name\")))\n",
    "     .select(\"name\", \"day\", \"new\")\n",
    "     .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 4. Weekly changes\n",
    "\n",
    "The data gathered are necessarily estimates and are inaccurate. However by taking a longer time span we can mitigate some of the effects.\n",
    "\n",
    "You can filter the data to view only Monday's figures **WHERE WEEKDAY(whn) = 0**.\n",
    "\n",
    "**Show the number of new cases in Italy for each week - show Monday only.**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>name</th><th>whn</th><th>new</th>\n",
       "                </tr>\n",
       "                <tr><td>Italy</td><td>2020-01-26</td><td>null</td></tr><tr><td>Italy</td><td>2020-02-02</td><td>2</td></tr><tr><td>Italy</td><td>2020-02-09</td><td>1</td></tr><tr><td>Italy</td><td>2020-02-16</td><td>0</td></tr><tr><td>Italy</td><td>2020-02-23</td><td>152</td></tr><tr><td>Italy</td><td>2020-03-01</td><td>1539</td></tr><tr><td>Italy</td><td>2020-03-08</td><td>5681</td></tr><tr><td>Italy</td><td>2020-03-15</td><td>17372</td></tr><tr><td>Italy</td><td>2020-03-22</td><td>34391</td></tr><tr><td>Italy</td><td>2020-03-29</td><td>38551</td></tr><tr><td>Italy</td><td>2020-04-05</td><td>31259</td></tr><tr><td>Italy</td><td>2020-04-12</td><td>27415</td></tr><tr><td>Italy</td><td>2020-04-19</td><td>22609</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(covid.filter((covid(\"name\")===\"Italy\") && \n",
    "              (dayofweek(covid(\"whn\"))===1))\n",
    "     .withColumn(\"new\", col(\"confirmed\")-lag(col(\"confirmed\"), 1)\n",
    "                 .over(Window.orderBy(\"whn\").partitionBy(\"name\")))\n",
    "     .select(\"name\", \"whn\", \"new\")\n",
    "     .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 5. LAG using a JOIN\n",
    "\n",
    "You can JOIN a table using DATE arithmetic. This will give different results if data is missing.\n",
    "\n",
    "**Show the number of new cases in Italy for each week - show Monday only.**\n",
    "\n",
    "In the sample query we JOIN this week tw with last week lw using the DATE_ADD function."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>name</th><th>whn</th><th>new</th>\n",
       "                </tr>\n",
       "                <tr><td>Italy</td><td>2020-01-26</td><td>null</td></tr><tr><td>Italy</td><td>2020-02-02</td><td>2</td></tr><tr><td>Italy</td><td>2020-02-09</td><td>1</td></tr><tr><td>Italy</td><td>2020-02-16</td><td>0</td></tr><tr><td>Italy</td><td>2020-02-23</td><td>152</td></tr><tr><td>Italy</td><td>2020-03-01</td><td>1539</td></tr><tr><td>Italy</td><td>2020-03-08</td><td>5681</td></tr><tr><td>Italy</td><td>2020-03-15</td><td>17372</td></tr><tr><td>Italy</td><td>2020-03-22</td><td>34391</td></tr><tr><td>Italy</td><td>2020-03-29</td><td>38551</td></tr><tr><td>Italy</td><td>2020-04-05</td><td>31259</td></tr><tr><td>Italy</td><td>2020-04-12</td><td>27415</td></tr><tr><td>Italy</td><td>2020-04-19</td><td>22609</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "\u001b[36ma\u001b[39m: \u001b[32mDataset\u001b[39m[\u001b[32mRow\u001b[39m] = [name: string, whn: string ... 3 more fields]"
      ]
     },
     "execution_count": 23,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val a = covid.filter((covid(\"name\")===\"Italy\") && \n",
    "                     (dayofweek(covid(\"whn\"))===1))\n",
    "\n",
    "(a.select(\"whn\", \"name\", \"confirmed\")\n",
    "     .join(a\n",
    "           .withColumn(\"whn\", to_date($\"whn\" + lit(\"7 days\").cast(CalendarIntervalType)))\n",
    "           .withColumnRenamed(\"confirmed\", \"confirmed2\")\n",
    "           .select(\"whn\", \"name\", \"confirmed2\"), \n",
    "           Seq(\"whn\", \"name\"), joinType=\"left\")\n",
    "     .withColumn(\"new\", col(\"confirmed\")-col(\"confirmed2\"))\n",
    "     .select(\"name\", \"whn\", \"new\")\n",
    "     .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 6. RANK()\n",
    "\n",
    "The query shown shows the number of confirmed cases together with the world ranking for cases.\n",
    "\n",
    "United States has the highest number, Spain is number 2...\n",
    "\n",
    "Notice that while Spain has the second highest confirmed cases, Italy has the second highest number of deaths due to the virus.\n",
    "\n",
    "**Include the ranking for the number of deaths in the table. Only include countries with a population of at least 10 million.**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>name</th><th>confirmed</th><th>rc1</th><th>deaths</th><th>rc2</th>\n",
       "                </tr>\n",
       "                <tr><td>United States</td><td>784326</td><td>1</td><td>42094</td><td>1</td></tr><tr><td>Spain</td><td>200210</td><td>2</td><td>20852</td><td>3</td></tr><tr><td>Italy</td><td>181228</td><td>3</td><td>24114</td><td>2</td></tr><tr><td>France</td><td>156480</td><td>4</td><td>20292</td><td>4</td></tr><tr><td>Germany</td><td>147065</td><td>5</td><td>4862</td><td>8</td></tr><tr><td>United Kingdom</td><td>125856</td><td>6</td><td>16550</td><td>5</td></tr><tr><td>Turkey</td><td>90980</td><td>7</td><td>2140</td><td>12</td></tr><tr><td>China</td><td>83817</td><td>8</td><td>4636</td><td>9</td></tr><tr><td>Iran</td><td>83505</td><td>9</td><td>5209</td><td>7</td></tr><tr><td>Russia</td><td>47121</td><td>10</td><td>405</td><td>23</td></tr><tr><td>Brazil</td><td>40743</td><td>11</td><td>2587</td><td>11</td></tr><tr><td>Belgium</td><td>39983</td><td>12</td><td>5828</td><td>6</td></tr><tr><td>Canada</td><td>37657</td><td>13</td><td>1725</td><td>13</td></tr><tr><td>Netherlands</td><td>33588</td><td>14</td><td>3764</td><td>10</td></tr><tr><td>Portugal</td><td>20863</td><td>15</td><td>735</td><td>15</td></tr><tr><td>India</td><td>18539</td><td>16</td><td>592</td><td>17</td></tr><tr><td>Peru</td><td>16325</td><td>17</td><td>445</td><td>21</td></tr><tr><td>Sweden</td><td>14777</td><td>18</td><td>1580</td><td>14</td></tr><tr><td>Japan</td><td>10797</td><td>19</td><td>236</td><td>27</td></tr><tr><td>South Korea</td><td>10674</td><td>20</td><td>236</td><td>27</td></tr><tr><td>Chile</td><td>10507</td><td>21</td><td>139</td><td>35</td></tr><tr><td>Saudi Arabia</td><td>10484</td><td>22</td><td>103</td><td>38</td></tr><tr><td>Ecuador</td><td>10128</td><td>23</td><td>507</td><td>19</td></tr><tr><td>Poland</td><td>9593</td><td>24</td><td>380</td><td>25</td></tr><tr><td>Romania</td><td>8936</td><td>25</td><td>478</td><td>20</td></tr><tr><td>Pakistan</td><td>8418</td><td>26</td><td>176</td><td>32</td></tr><tr><td>Mexico</td><td>8261</td><td>27</td><td>686</td><td>16</td></tr><tr><td>Czech Republic</td><td>6900</td><td>28</td><td>194</td><td>30</td></tr><tr><td>Indonesia</td><td>6760</td><td>29</td><td>590</td><td>18</td></tr><tr><td>Australia</td><td>6547</td><td>30</td><td>67</td><td>42</td></tr><tr><td>Philippines</td><td>6459</td><td>31</td><td>428</td><td>22</td></tr><tr><td>Ukraine</td><td>5710</td><td>32</td><td>151</td><td>33</td></tr><tr><td>Malaysia</td><td>5425</td><td>33</td><td>89</td><td>40</td></tr><tr><td>Dominican Republic</td><td>4964</td><td>34</td><td>235</td><td>29</td></tr><tr><td>Colombia</td><td>3977</td><td>35</td><td>189</td><td>31</td></tr><tr><td>Egypt</td><td>3333</td><td>36</td><td>250</td><td>26</td></tr><tr><td>South Africa</td><td>3300</td><td>37</td><td>58</td><td>43</td></tr><tr><td>Morocco</td><td>3046</td><td>38</td><td>143</td><td>34</td></tr><tr><td>Bangladesh</td><td>2948</td><td>39</td><td>101</td><td>39</td></tr><tr><td>Argentina</td><td>2941</td><td>40</td><td>136</td><td>36</td></tr><tr><td>Thailand</td><td>2792</td><td>41</td><td>47</td><td>44</td></tr><tr><td>Algeria</td><td>2718</td><td>42</td><td>384</td><td>24</td></tr><tr><td>Greece</td><td>2245</td><td>43</td><td>116</td><td>37</td></tr><tr><td>Kazakhstan</td><td>1852</td><td>44</td><td>19</td><td>54</td></tr><tr><td>Uzbekistan</td><td>1627</td><td>45</td><td>5</td><td>68</td></tr><tr><td>Iraq</td><td>1574</td><td>46</td><td>82</td><td>41</td></tr><tr><td>Azerbaijan</td><td>1436</td><td>47</td><td>19</td><td>54</td></tr><tr><td>Cameroon</td><td>1163</td><td>48</td><td>42</td><td>45</td></tr><tr><td>Cuba</td><td>1087</td><td>49</td><td>36</td><td>48</td></tr><tr><td>Ghana</td><td>1042</td><td>50</td><td>9</td><td>60</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(covid.join(world.select(\"name\", \"population\"), Seq(\"name\"))\n",
    "    .filter((col(\"whn\")===\"2020-04-20\") && \n",
    "            (col(\"population\")>=1e7))\n",
    "    .withColumn(\"rc1\", rank().over(Window.orderBy(col(\"confirmed\").desc)\n",
    "                                .partitionBy(\"whn\")))\n",
    "    .withColumn(\"rc2\", rank().over(Window.orderBy(col(\"deaths\").desc)\n",
    "                                  .partitionBy(\"whn\")))\n",
    "    .select(\"name\", \"confirmed\", \"rc1\", \"deaths\", \"rc2\")\n",
    "    .orderBy(col(\"confirmed\").desc)\n",
    "    .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 7. Infection rate\n",
    "\n",
    "The query shown includes a JOIN t the world table so we can access the total population of each country and calculate infection rates (in cases per 100,000).\n",
    "\n",
    "**Show the infect rate ranking for each country. Only include countries with a population of at least 10 million.**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>name</th>\n",
       "      <th>r_inf</th>\n",
       "      <th>rc</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>China</td>\n",
       "      <td>6.0</td>\n",
       "      <td>50</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>India</td>\n",
       "      <td>1.0</td>\n",
       "      <td>23</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>United States</td>\n",
       "      <td>238.0</td>\n",
       "      <td>87</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>Indonesia</td>\n",
       "      <td>3.0</td>\n",
       "      <td>35</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>Pakistan</td>\n",
       "      <td>4.0</td>\n",
       "      <td>41</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>...</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>85</th>\n",
       "      <td>Jordan</td>\n",
       "      <td>4.0</td>\n",
       "      <td>41</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>86</th>\n",
       "      <td>Dominican Republic</td>\n",
       "      <td>48.0</td>\n",
       "      <td>73</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>87</th>\n",
       "      <td>Sweden</td>\n",
       "      <td>143.0</td>\n",
       "      <td>81</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>88</th>\n",
       "      <td>Portugal</td>\n",
       "      <td>203.0</td>\n",
       "      <td>85</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>89</th>\n",
       "      <td>Azerbaijan</td>\n",
       "      <td>14.0</td>\n",
       "      <td>62</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "<p>90 rows × 3 columns</p>\n",
       "</div>"
      ],
      "text/plain": [
       "                  name  r_inf  rc\n",
       "0                China    6.0  50\n",
       "1                India    1.0  23\n",
       "2        United States  238.0  87\n",
       "3            Indonesia    3.0  35\n",
       "4             Pakistan    4.0  41\n",
       "..                 ...    ...  ..\n",
       "85              Jordan    4.0  41\n",
       "86  Dominican Republic   48.0  73\n",
       "87              Sweden  143.0  81\n",
       "88            Portugal  203.0  85\n",
       "89          Azerbaijan   14.0  62\n",
       "\n",
       "[90 rows x 3 columns]"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# a was obtained in #6\n",
    "(a.withColumn(\"r_inf\", round(1e5*a(\"confirmed\")/a(\"population\")))\n",
    "     .withColumn(\"rc\", rank().over(Window.orderBy(col(\"r_inf\"))\n",
    "                                  .partitionBy(\"whn\")))\n",
    "     .orderBy(col(\"population\").desc())\n",
    "     .select(\"name\", \"r_inf\", \"rc\")\n",
    "     .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 8. Turning the corner\n",
    "\n",
    "For each country that has had at last 1000 new cases in a single day, show the date of the peak number of new cases."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>name</th><th>whn</th><th>new</th>\n",
       "                </tr>\n",
       "                <tr><td>China</td><td>2020-02-13</td><td>15136</td></tr><tr><td>Italy</td><td>2020-03-21</td><td>6557</td></tr><tr><td>Switzerland</td><td>2020-03-23</td><td>1321</td></tr><tr><td>Israel</td><td>2020-03-25</td><td>1131</td></tr><tr><td>Spain</td><td>2020-03-25</td><td>9630</td></tr><tr><td>Austria</td><td>2020-03-26</td><td>1321</td></tr><tr><td>Germany</td><td>2020-03-27</td><td>6933</td></tr><tr><td>Iran</td><td>2020-03-30</td><td>3186</td></tr><tr><td>Canada</td><td>2020-04-05</td><td>2778</td></tr><tr><td>Netherlands</td><td>2020-04-10</td><td>1346</td></tr><tr><td>Ireland</td><td>2020-04-10</td><td>1515</td></tr><tr><td>Portugal</td><td>2020-04-10</td><td>1516</td></tr><tr><td>Ecuador</td><td>2020-04-10</td><td>2196</td></tr><tr><td>United Kingdom</td><td>2020-04-10</td><td>8733</td></tr><tr><td>United States</td><td>2020-04-10</td><td>33755</td></tr><tr><td>Turkey</td><td>2020-04-11</td><td>5138</td></tr><tr><td>France</td><td>2020-04-12</td><td>26849</td></tr><tr><td>Peru</td><td>2020-04-13</td><td>2265</td></tr><tr><td>Belgium</td><td>2020-04-15</td><td>2454</td></tr><tr><td>Japan</td><td>2020-04-17</td><td>1161</td></tr><tr><td>Brazil</td><td>2020-04-17</td><td>3257</td></tr><tr><td>Saudi Arabia</td><td>2020-04-18</td><td>1132</td></tr><tr><td>India</td><td>2020-04-19</td><td>1893</td></tr><tr><td>Russia</td><td>2020-04-19</td><td>6060</td></tr><tr><td>Singapore</td><td>2020-04-20</td><td>1426</td></tr><tr><td>Belarus</td><td>2020-04-20</td><td>1485</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(covid.withColumn(\"new\", (covid(\"confirmed\")-lag(col(\"confirmed\"), 1).over(\n",
    "    Window.partitionBy(\"name\").orderBy(\"whn\"))))\n",
    "     .na.fill(0, Array(\"new\"))\n",
    "     .withColumn(\"rc\", rank().over(Window.partitionBy(\"name\").orderBy(col(\"new\").desc)))\n",
    "     .filter((col(\"rc\")===1) && (col(\"new\")>1000))\n",
    "     .select(\"name\", \"whn\", \"new\")\n",
    "     .orderBy(\"whn\", \"new\")\n",
    "     .showHTML())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Scala",
   "language": "scala",
   "name": "scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".sc",
   "mimetype": "text/x-scala",
   "name": "scala",
   "nbconvert_exporter": "script",
   "version": "2.12.15"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
